import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

class FlatMapTestPage extends TestPage {
  FlatMapTestPage(super.title) {
    group('Rx.flatMapIterable', (){
      test('transforms a Stream<Iterable<S>> into individual items', () {
        expect(
            Rx.range(1, 4)
                .flatMapIterable((int i) => Stream<List<int>>.value(<int>[i])));
      });

      test('accidental broadcast', () async {
        final controller = StreamController<int>();

        final stream = controller.stream
            .flatMapIterable((int i) => Stream<List<int>>.value(<int>[i]));

        stream.listen(null);
        expect(() => stream.listen(null));

        controller.add(1);
      });

      test('nullable', () {
        nullableTest<String?>(
              (s) => s.flatMapIterable((v) => Stream.value([v])),
        );
      });
    });

    test('Rx.flatMap', () async {
      _getStream().flatMap(_getOtherStream).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.flatMap.reusable', () async {
      final transformer = FlatMapStreamTransformer<int, int>(_getOtherStream);

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.flatMap.asBroadcastStream', () async {
      final stream = _getStream().asBroadcastStream().flatMap(_getOtherStream);

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.flatMap.error.shouldThrowA', () async {
      final streamWithError =
      Stream<int>.error(Exception()).flatMap(_getOtherStream);

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.flatMap.error.shouldThrowB', () async {
      final streamWithError = Stream.value(1)
          .flatMap((_) => Stream<void>.error(Exception('Catch me if you can!')));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.flatMap.error.shouldThrowC', () async {
      final streamWithError =
      Stream.value(1).flatMap<void>((_) => throw Exception('oh noes!'));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.flatMap.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream = Stream.value(0).flatMap((_) => Stream.value(1));

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.flatMap.chains', () {
      expect(
        Stream.value(1)
            .flatMap((_) => Stream.value(2))
            .flatMap((_) => Stream.value(3)),
      );
    });

    test('Rx.flatMap accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.flatMap((_) => Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.flatMap(maxConcurrent: 1)', () {
      {
        final stream = Stream.fromIterable([1, 2, 3, 4]).flatMap(
              (value) => Rx.timer(
            value,
            Duration(milliseconds: (5 - value) * 100),
          ),
          maxConcurrent: 1,
        );
        expect(stream);
      }

      {
        final stream = Stream.fromIterable([1, 2, 3, 4]).flatMap(
              (value) => value == 1
              ? throw Exception()
              : Rx.timer(
            value,
            Duration(milliseconds: (5 - value) * 100),
          ),
          maxConcurrent: 1,
        );
        expect(stream);
      }

      {
        final stream = Stream.fromIterable([1, 2, 3, 4]).flatMap(
              (value) => value == 1
              ? Stream<int>.error(Exception())
              : Rx.timer(
            value,
            Duration(milliseconds: (5 - value) * 100),
          ),
          maxConcurrent: 1,
        );
        expect(stream);
      }
    });

    test('Rx.flatMap(maxConcurrent: 2)', () async {
      const maxConcurrent = 2;
      var activeCount = 0;

      final stream = Stream.fromIterable([1, 2, 3, 4]).flatMap(
            (value) {
          return Rx.defer(() {
            expect(++activeCount);

            final ms = (value.isOdd ? 5 : 6 - value) * 100;
            return Rx.timer(value, Duration(milliseconds: ms));
          }).doOnDone(() => --activeCount);
        },
        maxConcurrent: maxConcurrent,
      );

      expect(stream);
    });

    test('Rx.flatMap(maxConcurrent: 3)', () async {
      const maxConcurrent = 3;
      var activeCount = 0;

      final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6]).flatMap(
            (value) {
          return Rx.defer(() {
            expect(++activeCount);

            final ms = (value <= 3 ? 5 - value : value - 2) * 100;
            return Rx.timer(value, Duration(milliseconds: ms));
          }).doOnDone(() => --activeCount);
        },
        maxConcurrent: maxConcurrent,
      );

      expect(stream);
    });

    test('Rx.flatMap.cancel', () {
      expect(_getStream()
          .flatMap(_getOtherStream)
          .listen(((data) {}))
          .cancel());
    });

    test('Rx.flatMap(maxConcurrent: 1).cancel', () {
      expect(_getStream()
          .flatMap(_getOtherStream, maxConcurrent: 1)
          .listen(((data) {}))
          .cancel());
    });

    test('Rx.flatMap.take.cancel', () {
      expect(_getStream()
          .flatMap(_getOtherStream)
          .take(1)
          .listen(((data) => expect(data))));
    });

    test('Rx.flatMap(maxConcurrent: 1).take.cancel', () {
      expect(_getStream()
          .flatMap(_getOtherStream, maxConcurrent: 1)
          .take(1)
          .listen(((data) => expect(data))));
    });

    test('Rx.flatMap(maxConcurrent: 2).take.cancel', () {
      expect(_getStream()
          .flatMap(_getOtherStream, maxConcurrent: 2)
          .take(1)
          .listen(((data) => expect(data))));
    });

    test('Rx.flatMap.nullable', () {
      nullableTest<String?>(
            (s) => s.flatMap((v) => Stream.value(v)),
      );
    });

  }

}

Stream<int> _getStream() => Stream.fromIterable(const [1, 2, 3]);

Stream<int> _getOtherStream(int value) {
  final controller = StreamController<int>();

  Timer(
      Duration(
          milliseconds: value == 1
              ? 15
              : value == 2
              ? 10
              : 5), () {
    controller.add(value);
    controller.close();
  });

  return controller.stream;
}